package cn.webank.framework.concurrent;

import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.RevocationListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.webank.framework.exception.SysException;

/**
 * 基于zookeeper的分布式锁
 * 
 * @author biweiqian
 * @author jonyang
 *
 */
public class DistributedZooKeeperLock implements DistributedLock {
	private final static Logger LOG = LoggerFactory.getLogger(DistributedZooKeeperLock.class);

	private InterProcessMutex lock;
	private String lockPath;
	private String moduleId;
	private CuratorFramework client;

	private static Map<Thread, Boolean> lockedThreadMap = new WeakHashMap<Thread, Boolean>();
	private RevocationListener<InterProcessMutex> revocationListener;
	private ConnectionStateListener stateListener = new StateListener();

	public DistributedZooKeeperLock(String moduleId, String lockPath, CuratorFramework client) {
		this.moduleId = moduleId;
		// this.zkConnctString = zkConnctString;
		this.lockPath = lockPath;
		if (!lockPath.endsWith("/")) {
			lockPath += "/";
		}
		this.client = client;
		this.lock = createLock();

		this.revocationListener = new RevocationListener<InterProcessMutex>() {
			@Override
			public void revocationRequested(InterProcessMutex forLock) {
				if (!forLock.isAcquiredInThisProcess()) {
					return;
				}
				try {
					forLock.release();
				} catch (Exception e) {
					LOG.error("DistributedZooKeeperLock revocate error", e);
					// throw new SysException("",e);
				}
			}
		};
	}

	private InterProcessMutex createLock() {
		InterProcessMutex tmpLock = new InterProcessMutex(client, lockPath + moduleId);

		// 协同中断,如果其他线程/进程需要此锁中断时,调用此listener.
		tmpLock.makeRevocable(revocationListener);
		client.getConnectionStateListenable().addListener(stateListener);
		return tmpLock;
	}

	@Override
	public boolean tryLock(long tryLockTimeout, TimeUnit tryLockTimeUnit) {
		boolean acquire = false;

		try {
			acquire = lock.acquire(tryLockTimeout, tryLockTimeUnit);
			if (acquire) {
				lockedThreadMap.put(Thread.currentThread(), Boolean.TRUE);
			}
		} catch (Exception e) {
			throw new SysException("try lock error", e);
		}

		return acquire;
	}

	@Override
	public void unlock() {
		try {
			lock.release();
			if (lockedThreadMap.containsKey(Thread.currentThread())) {
				lockedThreadMap.remove(Thread.currentThread());
			}
		} catch (Exception e) {
			throw new SysException("unlock lock error", e);
		}

	}

	class StateListener implements ConnectionStateListener {
		@Override
		public void stateChanged(CuratorFramework client, ConnectionState newState) {

			// 如果当前lock没有获取锁,则忽略
			Boolean lockFlag = lockedThreadMap.get(Thread.currentThread());
			if ((lockFlag == null) || (!lockFlag)) {
				return;
			}
			switch (newState) {
			case LOST:
				// 一旦丢失链接,就意味着zk server端已经删除了锁数据
				lockedThreadMap.clear();
				lock = createLock(); // must be rebuild
				break;
			default:
				LOG.debug(newState.toString());
			}
		}
	}

}
